/**
 * FileName: RuleActionProcessImpl
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/2/14 11:17
 * Description: 外部配置文件的方式进行过滤,默认选择value列进行过滤
 */
package cn.com.bonc.process.impl;

import cn.com.bonc.process.Process;
import cn.com.bonc.util.DataFilterAndOperatorUtil;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import scala.Tuple2;

import java.sql.Timestamp;

public class RuleActionProcessImpl implements Process {

    @Override
    public Dataset<Row> processing(Dataset<Row> rowDataset) {
        int colLen = rowDataset.columns().length;
        if (colLen>1){//当数据源为hdfs时只有单列value，所以当列总数大于1时为kafka数据源，同时保留时间戳列
            return rowDataset.selectExpr("CAST(value AS STRING)","CAST(timestamp AS TIMESTAMP)")
                    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
                    .map((MapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>) t->
                                    new Tuple2<>(DataFilterAndOperatorUtil.getInstance().filter(t._1),t._2),
                            Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
                    )
                    .filter((FilterFunction<Tuple2<String, Timestamp>>) t->!"null".equals(t._1))
                    .toDF("value","timestamp");
        }

        Dataset<String> stringDataset = rowDataset.selectExpr("CAST(value AS STRING)")
                .as(Encoders.STRING());
                //.filter((FilterFunction<String>)x-> x.split("[|]", -1).length==26);//代码方式过滤，得到标准长度的数据
        return DataFilterAndOperatorUtil
                .getInstance()
                .filter(stringDataset)
                .select("value")
                .as(Encoders.bean(Row.class));
    }
}
